Azure Firewall/Template - Logic App and Automation Account for Adding O365 Rules to Azure Firewall/o365_rules.py (227 lines of code) (raw):

#!/usr/bin/env python3 import requests import argparse import json import re # Helper functions # True if any of the urls contained in the URL list contains a wildcard ('*') def urls_contain_wildcard(urls): for url in urls: if '*' in url: return True return False # Check whether URLs are correct: # - Wildcard needs to be in the beginning of the string, not valid in the middle def verify_urls(urls): corrected_urls = [] for url in urls: if url.find('*') <= 0: corrected_urls.append(url) else: corrected_urls.append(url[url.find('*'):]) if args.verbose: print("WARNING: URL {0} reduced to {1}".format(url, url[url.find('*'):])) return corrected_urls # Filters out IP addresses based on the args.ip_version parameter (can be ipv4, ipv6 or both) def filter_ips(ip_list): # For both versions, dont filter if args.ip_version == 'both': return ip_list else: filtered_ips = [] for ip in ip_list: # For 'ipv4', return only those who match the IPv4 check if is_ipv4(ip) and (args.ip_version == 'ipv4'): filtered_ips.append(ip) # For 'ipv6', return only non-IPv4 addresses (assumed to be ipv6 then) elif (not is_ipv4(ip)) and (args.ip_version == 'ipv6'): filtered_ips.append(ip) # if args.verbose: # print("DEBUG: IP list {0} filtered to {1}".format(str(ip_list), str(filtered_ips))) return filtered_ips # True if parameter is an ipv4 address def is_ipv4(ip_address): return bool(re.match(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}(?:/\d{1,2}|)$",str(ip_address))) # Arguments parser = argparse.ArgumentParser(description='Generate an ARM template to create a Rule Collection Group in an Azure policy with rules that allow access to M365 endpoints.') parser.add_argument('--policy-name', dest='policy_name', action='store', default="", help='Name for the Azure Firewall Policy. The default is "o365policy"') parser.add_argument('--policy-sku', dest='policy_sku', action='store', default="Premium", help='SKU for the Azure Firewall Policy. Possible values: Standard, Premium (default: Premium)') parser.add_argument('--do-not-create-policy', dest='dont_create_policy', action='store_true', default=False, help='If specified, do not include ARM code for the policy, only for the rule collection group. Use if the policy already exists.') parser.add_argument('--rcg-name', dest='rcg_name', action='store', default="o365_rulecollectiongroup", help='Name for the Rule Collection Group to create in the Azure Firewall Policy. The default is "o365"') parser.add_argument('--rcg-priority', dest='rcg_prio', action='store', default="10000", help='Priority for the Rule Collection Group to create in the Azure Firewall Policy. The default is "10000"') parser.add_argument('--format', dest='format', action='store', default="json", help='Output format. Possible values: json, none') parser.add_argument('--ip-version', dest='ip_version', action='store', default="ipv4", help='IP version of AzFW rules. Possible values: ipv4, ipv6, both. Default: ipv4') parser.add_argument('--pretty', dest='pretty', action='store_true', default=False, help='Print JSON in pretty mode (default: False)') parser.add_argument('--verbose', dest='verbose', action='store_true', default=False, help='Run in verbose mode (default: False)') args = parser.parse_args() # Variables o365_endpoints_url = 'https://endpoints.office.com/endpoints/worldwide?clientrequestid=b10c5ed1-bad1-445f-b386-b919946339a7' app_rules = [] net_rules = [] rcg_name = args.rcg_name rcg_prio = args.rcg_prio rc_app_name = 'o365app' rc_app_prio = "11000" rc_net_name = 'o365net' rc_net_prio = "10900" # Get O365 endpoints from the Internet response = requests.get(o365_endpoints_url) if response.status_code == 200: if args.verbose: print ("DEBUG: File {0} downloaded successfully".format(o365_endpoints_url)) try: # Deserialize JSON to object variable o365_data = json.loads(response.text) except Exception as e: print("Error deserializing JSON content: {0}".format(str(e))) sys.exit(1) # Go through the rules cnt_apprules = 0 cnt_netrules_ip = 0 cnt_netrules_fqdn = 0 cnt_endpoints = 0 for endpoint in o365_data: cnt_endpoints += 1 # IP-based Net Rule if ('ips' in endpoint): cnt_netrules_ip += 1 if ('ips' in endpoint) and (('tcpPorts' in endpoint) or ('udpPorts' in endpoint)): new_rule = { 'name': 'id' + str(endpoint['id']) + '-' + str(endpoint['serviceAreaDisplayName']).replace(" ", ""), 'ruleType': 'NetworkRule', 'sourceAddresses': [ '*' ], 'destinationAddresses': filter_ips(endpoint['ips']), 'destinationFqdns': [] } if 'tcpPorts' in endpoint: new_rule['ipProtocols'] = [ 'tcp' ] new_rule['destinationPorts'] = str(endpoint['tcpPorts']).split(",") else: new_rule['ipProtocols'] = [ 'udp' ] new_rule['destinationPorts'] = str(endpoint['udpPorts']).split(",") net_rules.append(new_rule) # Watch out for UDP+TCP! if ('udpPorts' in endpoint) and ('tcpPorts' in endpoint): print("WARNING: Endpoint ID {0} has both TCP and UDP ports!".format(endpoint['id'])) else: if not ('ips' in endpoint): print('ERROR: Endpoint ID {0} is IP-based with wildcards, but does not have ips'.format(endpoint['id'])) if not ('udpPorts' in endpoint): print('ERROR: Endpoint ID {0} is IP-based with wildcards, but does not have udpPorts'.format(endpoint['id'])) if args.verbose: print('DEBUG: endpoint:', str(endpoint)) # App Rule elif ('tcpPorts' in endpoint) and ((endpoint['tcpPorts'] == "80,443") or (endpoint['tcpPorts'] == "443,80") or (endpoint['tcpPorts'] == "443") or (endpoint['tcpPorts'] == "80")): cnt_apprules += 1 if 'urls' in endpoint: new_rule = { 'name': 'id' + str(endpoint['id']) + '-' + str(endpoint['serviceAreaDisplayName']).replace(" ", ""), 'ruleType': 'ApplicationRule', 'sourceAddresses': [ '*' ], 'targetFqdns': verify_urls(endpoint['urls']), 'protocols': [] } dst_ports = str(endpoint['tcpPorts']).split(",") if '80' in dst_ports: new_rule['protocols'].append({'protocolType': 'Http', 'port': 80}) if '443' in dst_ports: new_rule['protocols'].append({'protocolType': 'Https', 'port': 443}) app_rules.append(new_rule) else: print('ERROR Endpoint ID {0} is web-based but does not have URLs'.format(endpoint['id'])) # FQDN-based Net Rule else: cnt_netrules_fqdn += 1 if ('urls' in endpoint) and (('tcpPorts' in endpoint) or ('udpPorts' in endpoint)): new_rule = { 'name': 'id' + str(endpoint['id']) + '-' + str(endpoint['serviceAreaDisplayName']).replace(" ", ""), 'ruleType': 'NetworkRule', 'sourceAddresses': [ '*' ], 'destinationAddresses': [], 'destinationFqdns': endpoint['urls'], } if 'tcpPorts' in endpoint: new_rule['ipProtocols'] = [ 'tcp' ] new_rule['destinationPorts'] = str(endpoint['tcpPorts']).split(",") else: new_rule['ipProtocols'] = [ 'udp' ] new_rule['destinationPorts'] = str(endpoint['udpPorts']).split(",") net_rules.append(new_rule) # Watch out for UDP+TCP! if ('udpPorts' in endpoint) and ('tcpPorts' in endpoint): print("WARNING: Endpoint ID {0} has both TCP and UDP ports!".format(endpoint['id'])) else: if not ('urls' in endpoint): print('ERROR: Endpoint ID {0} is IP-based with wildcards, but does not have urls'.format(endpoint['id'])) if not ('udpPorts' in endpoint): print('ERROR: Endpoint ID {0} is IP-based with wildcards, but does not have udpPorts'.format(endpoint['id'])) if args.verbose: print('DEBUG: endpoint:', str(endpoint)) ########## # Output # ########## # Generate JSON would be creating an object and serialize it if args.format == "json": api_version = "2021-02-01" azfw_policy_name = args.policy_name arm_template = { '$schema': 'https://schema.management.azure.com/schemas/2019-04-01/deploymentTemplate.json#', 'contentVersion': '1.0.0.0', 'parameters': {}, 'variables': { 'location': '[resourceGroup().location]' }, 'resources': [] } if not args.dont_create_policy: resource_policy = { 'type': 'Microsoft.Network/firewallPolicies', 'apiVersion': api_version, 'name': azfw_policy_name, 'location': '[variables(\'location\')]', 'properties': { 'sku': { 'tier': args.policy_sku }, 'dnsSettings': { 'enableProxy': 'true' }, 'threatIntelMode': 'Alert' } } arm_template['resources'].append(resource_policy) resource_rcg = { 'type': 'Microsoft.Network/firewallPolicies/ruleCollectionGroups', 'apiVersion': api_version, 'name': azfw_policy_name + '/' + rcg_name, 'dependsOn': [], 'location': '[variables(\'location\')]', 'properties': { 'priority': rcg_prio, 'ruleCollections': [] } } if not args.dont_create_policy: resource_rcg['dependsOn'].append('[resourceId(\'Microsoft.Network/firewallPolicies\', \'' + azfw_policy_name +'\')]'), resource_net_rc = { 'ruleCollectionType': 'FirewallPolicyFilterRuleCollection', 'name': rc_net_name, 'priority': rc_net_prio, 'action': { 'type': 'allow' }, 'rules': net_rules } resource_app_rc = { 'ruleCollectionType': 'FirewallPolicyFilterRuleCollection', 'name': rc_app_name, 'priority': rc_app_prio, 'action': { 'type': 'allow' }, 'rules': app_rules } resource_rcg['properties']['ruleCollections'].append(resource_net_rc) resource_rcg['properties']['ruleCollections'].append(resource_app_rc) arm_template['resources'].append(resource_rcg) if args.pretty: print(json.dumps(arm_template, indent=4, sort_keys=True)) else: print(json.dumps(arm_template)) elif args.format == "none": if args.verbose: print('DEBUG: {0} endpoints analized: {1} app rules, {2} FQDN-based net rules and {3} IP-based net rules'.format(str(cnt_endpoints), str(cnt_apprules), str(cnt_netrules_fqdn), str(cnt_netrules_ip))) # print('DEBUG: Net rules:', str(net_rules)) # print('DEBUG: App rules:', str(app_rules)) else: print ("Format", args.format, "not recognized!")